本文基于Rxjava 2.x版本,介绍用于操作多个 Observable 对象的操作符。
Operators that work with multiple source Observables to create a single Observable
- And/Then/When — combine sets of items emitted by two or more Observables by means of
Pattern
andPlan
intermediaries - CombineLatest — when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function
- Join — combine items emitted by two Observables whenever an item from one Observable is emitted during a time window defined according to an item emitted by the other Observable
- Merge — combine multiple Observables into one by merging their emissions
- StartWith — emit a specified sequence of items before beginning to emit the items from the source Observable
- Switch — convert an Observable that emits Observables into a single Observable that emits the items emitted by the most-recently-emitted of those Observables
- Zip — combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function
startWith操作符
在 Observable 序列发送之前添加起始位置指定发送的 Observable
startWith Example
1 | Observable<String> names = Observable.just("Spock", "McCoy"); |
merge 操作符
merge 操作符
merge 操作符在用来合并多个 Observable 操作,合并的序列依次调用 Emitter#onNext()
方法执行,遇到错误序列将不继续执行。
merge Example
1 | Observable.just(1, 2, 3) |
mergeDelayError 操作符
和 merge 操作符类似,不过遇到错误会保存下来,等所有 Observable 执行完毕,再处理出现错误的Observable。注意 mergeDelayError 是静态泛型方法,只可以通过类名访问。
mergeDelayError Example
1 | Observable<String> observable1 = Observable.error(new IllegalArgumentException("")); |
zip 操作符
将多个 Observable 发射的数据合并起来,并生成一个Observable根据合并后的数据继续向下传递。onNext 方法只会调用一次。zip
、zipArray
、zipIterable
、zipWith
zip Example
1 | Observable<String> firstNames = Observable.just("James", "Jean-Luc", "Benjamin"); |
zip 操作符
会将对应 Observable 对应的 Emitter onNext
的数据合并最后通过新生成的 Observable 发射调整后的数据。
combineLatest 操作符
让两个 Obserable 序列按照最新发出的数据组合在一起生成一个 Observable 序列向下传递。
combineLatest Example
1 | Observable<Long> newsRefreshes = Observable.interval(100, TimeUnit.MILLISECONDS); |
switchOnNext 操作符
switchOnNext Example
1 | Observable<Observable<String>> timeIntervals = |
参考文章:
https://github.com/ReactiveX/RxJava/wiki/Combining-Observables